const fs = require('fs');
const nodeUtil = require('util');
const timer = require('../timer')
const util = require('../util')
const mongo = require('../db_helper')
const config = require('../config')
const ObjectId = require('mongodb').ObjectId

const plugins = {
	spark: require('./spark'),
  datax: require('./datax'),
  shell: require('./shell'),
  streaming: require('./streaming')
}

const logger = util.getLogger('task.index')
const loadTimerName = 'load-tasks'
const unlink = nodeUtil.promisify(fs.unlink);   //删除文件
const readdir = nodeUtil.promisify(fs.readdir);   //读取目录内容
const stat = nodeUtil.promisify(fs.stat);

//运行job
const runJob = async (job, role = 'timer') => {
  //保存日志 
  let log = {
    role,
    jobId: job._id,
    title: job.title,
    beginTime: new Date(),
    group: job.group.map(x => Object.assign({state: '0'}, x))
  }
  let result = await mongo.logs.insertOne(log)
  let logId = log._id = result.insertedId

  //更新开始时间
  logger.info(`开始运行任务：${job.title}`)
  mongo.jobs.updateOne({
    _id: new ObjectId(job._id)
  }, {
    $set: {
      lastRunTime: new Date(),
      lastLog: log._id
    }
  })

  //加载子任务
  if (log.group && log.group.length) {
    for (x of log.group) {
      x.beginTime = new Date()
      logger.info(`运行子任务：${job.title}#${x.id} - ${x.title} - ${x.type}`)
      try{
        x.log = await plugins[x.type].run(x, job._id)    //任务运行插件
        x.state = '1'
      } catch(e) {
        //任务运行异常：
        logger.error(`任务 ${job.title}#${x.id} - ${x.title} - ${x.type} 运行异常：` + e)
        if (e.message) {
          x.log = { system: `${e.message}\n${e.stack}` }
        } else {
          x.log = e
        }
        x.state = '2'
        break;
      }finally{
        x.endTime = new Date()
        await mongo.logs.replaceOne({
          _id: logId
        }, log)
      }
    }
  }
  log.endTime = new Date()
  await mongo.logs.replaceOne({
    _id: logId
  }, log)

  logger.info(`任务：${job.title} 运行结束。`)
}

//启动定时任务
const startRunTask = () => {  

  //加载任务定时任务，成功后中止，失败后重复尝试
  timer.addTask({
    name: loadTimerName,
    cycle: config.mongo.options.connectTimeoutMS + config.mongo.options.socketTimeoutMS,
    // begin: new Date().getTIme() + config.mongo.options.connectTimeoutMS,
    fun: async () => {
      try{
        mongo.connect()   //连接数据库
        //读取所有任务
        let jobList = await mongo.jobs.find({ status: '1' }).toArray()
        if (jobList && jobList.length) {
          timer.removeTask(loadTimerName)   //成功后删除
          jobList = jobList.filter(job => {
            let streamingJobs =  job.group.filter(x => x.type === 'streaming')
            return ! (streamingJobs && streamingJobs.length)
          })
          logger.info(`加载 ${jobList.length} 个定时任务。`)
          jobList.forEach(job => addJob(job))
        }
      } catch (e) {
        logger.error('无法读取任务列表：', e)
      }
    }
  })
}

//启动定时日志清理
const startClearLog = () => {
  let begin = new Date();
  begin.setHours(1);
  begin.setMinutes(0);
  begin.setSeconds(0);
  timer.addTask({
    name: 'clear-logs',
    begin,
    cycle: 86400000,
    fun: async () => {
      try{
        let t = new Date();
        t.setMonth(t.getMonth() - 1)
        logger.info(`开始运行日志删除任务，删除 ${t.format('yyyy-MM-dd')} 前的日志`)
        //删除一个月前的日志
        let result = await mongo.logs.deleteMany({
          beginTime: {
            $lt: t
          }
        })
        logger.info(`删除了 ${result.deletedCount} 条日志。`)
      } catch(e) {
        logger.error('清理日志任务异常：', e)
      }
    }
  })
}

//清理30天之前的日志输出
const startClearLogFile = () => {
  let begin = new Date();
  begin.setHours(1);
  begin.setMinutes(1);
  begin.setSeconds(0);
  timer.addTask({
    name: 'clear-logs-file',
    begin,
    cycle: 86400000,
    fun: async () => {
      try{
        let delTime = new Date();
        delTime.setMonth(t.getMonth() - 1)
        //任务日志目录
        let path = `${config.upload.publishDir}/log`
        if (await util.exists(path)){
          let files = await readdir(path);
          let count = files.length;
          let fail = 0;
          let delCount = 0;
          let clearTimestrap = delTime.getTime();    //此时间之前的文件删除
          logger.info(`删除修改时间 ${delTime.format('yyyy-MM-dd HH:mm:ss')} 之前的文件`)
          for(file of files){
            let fix = file.split('.').pop().toLowerCase();
            let filepath = `${path}/${file}`;
            let fileStat = await stat(filepath);
            if (fileStat.ctimeMs < clearTimestrap){   //使用修改时间比较
                delCount ++;
              //删除文件
              try{
                logger.debug(`删除已传输的文件：${filepath}`);
                await unlink(filepath);
              }catch(e){
                logger.error(`文件：${filepath}，删除失败：`, e);
                fail ++;
              }
            }
          }
          logger.info(`清理目录 ${path} ，一共 ${count} 个文件，需删除 ${delCount}，删除失败 ${fail}`)
        }
      } catch(e) {
        logger.error('清理日志文件任务异常：', e)
      }
    }
  })
}

//启动
const start = () => {
  startRunTask()
  startClearLog()
  startClearLogFile()
}

//添加job任务
const addJob = (job) => {
  if (job.group.filter(x => {return x.type === 'streaming'}).length) {
    logger.info(`包含 streaming 任务，不创建定时器直接运行。`)
    runJob(job)
  }else{
    logger.info(`添加任务定时器:`, job)
    timer.addTask({
      name: `${job.title}-${job._id}`,
      begin: new Date(job.beginTime),
      cycle: Number(job.cycle),
      fun: runJob,
      args: job
    })
  }
}

//删除任务定时器
const remove = (job) => {
  let taskName = `${job.title}-${job._id}`
  logger.info('删除任务定时器：' + taskName)
  timer.removeTask(taskName)
  if (job.group && job.group.length) {
    job.group.forEach(x => plugins[x.type].remove(x, job._id))
  }
}

module.exports = {
  runJob, start, addJob, remove
}